package com.linys.scala.flink

import org.apache.flink.api.scala.ExecutionEnvironment

/**
  * flink实现批处理word_count
  */
object WordCountBatch {
  def main(args: Array[String]): Unit = {
    val input = "dir/flink/file.txt"
    val output = "dir/flink/result.txt"

    // flink上下文环境（初始化对象）
    val env = ExecutionEnvironment.getExecutionEnvironment

    // 隐式转换
    import org.apache.flink.api.scala._

    // 读取数据
    val text: DataSet[String] = env.readTextFile(input)

    // 计算
    val counts: AggregateDataSet[(String, Int)] = text.flatMap(_.split(" "))
      .filter(_.nonEmpty).map((_, 1)).groupBy(0).sum(1)

    counts.writeAsCsv(output).setParallelism(1)

    env.execute("batch wc!!!")
  }
}
